package com.atguigu.app.dws;

import com.alibaba.fastjson.JSONObject;
import com.atguigu.bean.TrafficHomeDetailPageViewBean;
import com.atguigu.utils.DateFormatUtil;
import com.atguigu.utils.MyClickHouseUtil;
import com.atguigu.utils.MyKafkaUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;

/**
 * @className: DwsTrafficPageViewWindow
 * @author: LinCong
 * @description: 流量域页面浏览各窗口汇总表  (从 Kafka 页面日志主题读取数据，统计当日的首页和商品详情页独立访客数。)
 * @date: 2023/2/10 13:35
 * @version: 1.0
 */

//日志服务器（.log）-> flume -> kafka -> flink(BaseLogApp) -> kafka -> flink(DwsTrafficPageViewWindow) -> clickhouse
public class DwsTrafficPageViewWindow {
    public static void main(String[] args) throws Exception {
//        todo 1、获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);
        ////        1.1、开启checkpoint
//        env.enableCheckpointing(5 * 60000L, CheckpointingMode.EXACTLY_ONCE);
//        //设置checkpoint的超时时间,如果 Checkpoint在 10分钟内尚未完成说明该次Checkpoint失败,则丢弃。(默认10分钟)
//        env.getCheckpointConfig().setCheckpointTimeout(10 * 60000L);
//        env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
//        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(120000L);
//        //固定延迟重启   （最多重启次数，每次重启的时间间隔）
//        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L));
////        1.2、设置状态后端
//        env.setStateBackend(new HashMapStateBackend());
//        System.setProperty("HADOOP_USER_NAME", "kevin");
//        env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop3cluster/211126/ck");

//        todo 2、读取 kafka 页面日志主题数据创建流
        String topic = "dwd_traffic_page_log";
        String groupId = "dws_traffic_page_view_window";
        DataStreamSource<String> kafkaDS = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer(topic, groupId));

//        todo 3、将每行数据转换为JSON对象并过滤（首页与商品详情页）
        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.flatMap(new FlatMapFunction<String, JSONObject>() {
            @Override
            public void flatMap(String value, Collector<JSONObject> out) throws Exception {
                JSONObject jsonObject = JSONObject.parseObject(value);
                String pageId = jsonObject.getJSONObject("page").getString("page_id");
                if ("home".equals(pageId) || "good_detail".equals(pageId)) {
                    out.collect(jsonObject);
                }
            }
        });

//        todo 4、提取事件时间生成 watermark
        SingleOutputStreamOperator<JSONObject> jsonObjWithWatermarkDS = jsonObjDS.assignTimestampsAndWatermarks(
                WatermarkStrategy.<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                        .withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {
                            @Override
                            public long extractTimestamp(JSONObject element, long recordTimestamp) {
                                return element.getLong("ts");
                            }
                        })
        );

//        todo 5、按照mid分组
        KeyedStream<JSONObject, String> keyedStream = jsonObjWithWatermarkDS.keyBy(json -> json.getJSONObject("common").getString("mid"));

//        todo 6、使用状态编程过滤出首页与商品详情页的独立访客
        SingleOutputStreamOperator<TrafficHomeDetailPageViewBean> trafficHomeDetailDS = keyedStream.flatMap(new RichFlatMapFunction<JSONObject, TrafficHomeDetailPageViewBean>() {
            private ValueState<String> homeLastState;
            private ValueState<String> detailLastState;
            private TrafficHomeDetailPageViewBean trafficHomeDetailPageViewBean;

            @Override
            public void open(Configuration parameters) throws Exception {
                ValueStateDescriptor<String> homeStateDes = new ValueStateDescriptor<String>("home_state_des", String.class);
                ValueStateDescriptor<String> detailStateDes = new ValueStateDescriptor<String>("detail_state_des", String.class);

                StateTtlConfig stateTtlConfig = new StateTtlConfig.Builder(Time.days(1))
                        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                        .build();

                homeStateDes.enableTimeToLive(stateTtlConfig);
                detailStateDes.enableTimeToLive(stateTtlConfig);

                homeLastState = getRuntimeContext().getState(homeStateDes);
                detailLastState = getRuntimeContext().getState(detailStateDes);

                trafficHomeDetailPageViewBean=new TrafficHomeDetailPageViewBean();

            }

            @Override
            public void flatMap(JSONObject value, Collector<TrafficHomeDetailPageViewBean> out) throws Exception {
                //获取状态数据以及当前数据中的日期
                Long ts = value.getLong("ts");
                String curDt = DateFormatUtil.toDate(ts);
                String homeLastDt = homeLastState.value();
                String detailLastDt = detailLastState.value();

                long homeUvCt = 0L;
                long gooodDetailUvCt = 0L;

                if ("home".equals(value.getJSONObject("page").getString("page_id"))) {
                    if (homeLastDt == null || !homeLastDt.equals(curDt)) {
                        homeUvCt = 1L;
                        homeLastState.update(curDt);
                    }
                } else {
                    if (detailLastDt == null || !detailLastDt.equals(curDt)) {
                        gooodDetailUvCt = 1L;
                        detailLastState.update(curDt);
                    }
                }

                if (homeUvCt == 1L || gooodDetailUvCt == 1L) {
                    trafficHomeDetailPageViewBean.setStt("");
                    trafficHomeDetailPageViewBean.setEdt("");
                    trafficHomeDetailPageViewBean.setHomeUvCt(homeUvCt);
                    trafficHomeDetailPageViewBean.setGoodDetailUvCt(gooodDetailUvCt);
                    trafficHomeDetailPageViewBean.setTs(ts);

                    out.collect(new TrafficHomeDetailPageViewBean("", "", homeUvCt, gooodDetailUvCt, ts));
                }
            }
        });

//        todo 7、开窗聚合
        SingleOutputStreamOperator<TrafficHomeDetailPageViewBean> resultDS = trafficHomeDetailDS.windowAll(TumblingEventTimeWindows
                .of(org.apache.flink.streaming.api.windowing.time.Time.seconds(10)))
                .reduce(new ReduceFunction<TrafficHomeDetailPageViewBean>() {
                    @Override
                    public TrafficHomeDetailPageViewBean reduce(TrafficHomeDetailPageViewBean value1, TrafficHomeDetailPageViewBean value2) throws Exception {
                        value1.setHomeUvCt(value1.getHomeUvCt() + value2.getHomeUvCt());
                        value1.setGoodDetailUvCt(value1.getGoodDetailUvCt() + value2.getGoodDetailUvCt());
                        return value1;
                    }
                }, new AllWindowFunction<TrafficHomeDetailPageViewBean, TrafficHomeDetailPageViewBean, TimeWindow>() {
                    @Override
                    public void apply(TimeWindow window, Iterable<TrafficHomeDetailPageViewBean> values, Collector<TrafficHomeDetailPageViewBean> out) throws Exception {
                        TrafficHomeDetailPageViewBean next = values.iterator().next();

                        next.setTs(System.currentTimeMillis());
                        next.setStt(DateFormatUtil.toYmdHms(window.getStart()));
                        next.setEdt(DateFormatUtil.toYmdHms(window.getEnd()));

                        out.collect(next);
                    }
                });

//        todo 8、将数据写出到clickhouse
//        trafficHomeDetailDS.print("trafficHomeDetailDS>>>>>>");
        resultDS.print("resultDS>>>>>>");

        resultDS.addSink(MyClickHouseUtil.getSinkFunction("insert into dws_traffic_page_view_window values(?,?,?,?,?)"));

//        todo 9、启动任务
        env.execute();
    }
}
